#!/usr/bin/env python3
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Splits a branch into smaller branches and uploads CLs."""

import collections
import dataclasses
import hashlib
import math
import os
import re
import tempfile
from typing import List, Set, Tuple, Dict, Any

import gclient_utils
import git_footers
import scm

import git_common as git

# If a call to `git cl split` will generate more than this number of CLs, the
# command will prompt the user to make sure they know what they're doing. Large
# numbers of CLs generated by `git cl split` have caused infrastructure issues
# in the past.
CL_SPLIT_FORCE_LIMIT = 10

# The maximum number of top reviewers to list. `git cl split` may send many CLs
# to a single reviewer, so the top reviewers with the most CLs sent to them
# will be listed.
CL_SPLIT_TOP_REVIEWERS = 5


def Emit(*msg: str):
    """Wrapper for easier mocking during tests"""
    print(*msg)


def EmitWarning(*msg: str):
    print("Warning: ", *msg)


def HashList(lst: List[Any]) -> str:
    """
    Hash a list, returning a positive integer. Lists with identical elements
    should have the same hash, regardless of order.
    """
    # We need a bytes-like object for hashlib algorithms
    byts = bytes().join(
        (action + file).encode() for action, file in sorted(lst))
    # No security implication: we just need a deterministic output
    hashed = hashlib.sha1(byts)
    return hashed.hexdigest()[:10]

FilesAndOwnersDirectory = collections.namedtuple("FilesAndOwnersDirectory",
                                                 "files owners_directories")


@dataclasses.dataclass
class CLInfo:
    """
    Data structure representing a single CL. The script will split the large CL
    into a list of these.

    Fields:
    - reviewers: the reviewers the CL will be sent to.
    - files: a list of <action>, <file> pairs in the CL.
             Has the same format as `git status`.
    - description: a string describing the CL. Typically the list of affected
                   directories. Only used for replacing $description in
                   the user-provided CL description.
    """
    # Have to use default_factory because lists are mutable
    reviewers: Set[str] = dataclasses.field(default_factory=set)
    files: List[Tuple[str, str]] = dataclasses.field(default_factory=list)

    # This is only used for formatting in the CL description, so it just
    # has to be convertible to string.
    description: Any = ""

    def FormatForPrinting(self) -> str:
        """
        Format the CLInfo for printing to a file in a human-readable format.
        """
        # Don't quote the reviewer emails in the output
        reviewers_str = ", ".join(self.reviewers)
        lines = [
            f"Reviewers: [{reviewers_str}]", f"Description: {self.description}"
        ] + [f"{action}, {file}" for (action, file) in self.files]
        return "\n".join(lines)


def CLInfoFromFilesAndOwnersDirectoriesDict(
        d: Dict[Tuple[str], FilesAndOwnersDirectory]) -> List[CLInfo]:
    """
    Transform a dictionary mapping reviewer tuples to FilesAndOwnersDirectories
    into a list of CLInfo
    """
    cl_infos = []
    for (reviewers, fod) in d.items():
        cl_infos.append(
            CLInfo(set(reviewers), fod.files,
                   FormatDirectoriesForPrinting(fod.owners_directories)))
    return cl_infos


def EnsureInGitRepository():
    """Throws an exception if the current directory is not a git repository."""
    git.run('rev-parse')


def GetGitInfo(repository_root, cl) -> Tuple[List[Tuple[str, str]], str, str]:
    """
    Get various information by running git commands.

    Specifically, determine which branch we're on, which upstream we're
    targeting, and the list of changed files (and the associated git actions)
    that make up the CL we're splitting.
    """
    upstream = cl.GetCommonAncestorWithUpstream()
    files = [(action.strip(), f)
             for action, f in scm.GIT.CaptureStatus(repository_root, upstream)]

    refactor_branch = git.current_branch()
    assert refactor_branch, "Can't run from detached branch."
    refactor_branch_upstream = git.upstream(refactor_branch)
    assert refactor_branch_upstream, \
        "Branch %s must have an upstream." % refactor_branch

    return files, refactor_branch, refactor_branch_upstream


def CreateBranchName(prefix: str, files: List[Tuple[str, str]]) -> str:
    """
    Given a sub-CL as a list of (action, file) pairs, create a unique and
    deterministic branch name for it.
    The name has the format <prefix>_<dirname>_<hash(files)>_split.
    """
    file_names = [file for _, file in files]
    if len(file_names) == 1:
        # Only one file, just use its directory as the common path
        common_path = os.path.dirname(file_names[0])
    else:
        common_path = os.path.commonpath(file_names)
    if not common_path:
        # Files have nothing in common at all. Unlikely but possible.
        common_path = "None"
    # Replace path delimiter with underscore in common_path.
    common_path = common_path.replace(os.path.sep, '_')
    return f"{prefix}_{HashList(files)}_{common_path}_split"


def CreateBranchForOneCL(prefix: str, files: List[Tuple[str, str]],
                         upstream: str) -> bool:
    """Creates a branch named |prefix| + "_" + |hash(files)| + "_split".

    Return false if the branch already exists. |upstream| is used as upstream
    for the created branch.
    """
    branches_on_disk = set(git.branches(use_limit=False))
    branch_name = CreateBranchName(prefix, files)
    if branch_name in branches_on_disk:
        return False
    git.run('checkout', '-t', upstream, '-b', branch_name)
    return True


def ValidateExistingBranches(prefix: str, cl_infos: List[CLInfo]) -> bool:
    """
    Check if there are splitting branches left over from a previous run.
    We only allow branches to exist if we're resuming a previous upload,
    in which case we require that the existing branches are a subset of
    the branches we're going to generate.
    """
    branches_on_disk = set(
        branch for branch in git.branches(use_limit=False)
        if branch.startswith(prefix + "_") and branch.endswith("_split"))

    branches_to_be_made = set(
        CreateBranchName(prefix, info.files) for info in cl_infos)

    if not branches_on_disk.issubset(branches_to_be_made):
        Emit("It seems like you've already run `git cl split` on this branch.\n"
             "If you're resuming a previous upload, you must pass in the "
             "same splitting as before, using the --from-file option.\n"
             "If you're starting a new upload, please clean up existing split "
             f"branches (starting with '{prefix}_' and ending with '_split'), "
             "and re-run the tool.")
        Emit("The following branches need to be cleaned up:\n")
        for branch in branches_on_disk - branches_to_be_made:
            Emit(branch)
        return False
    return True


def FormatDirectoriesForPrinting(directories: List[str],
                                 prefix: str = None) -> str:
    """Formats directory list for printing

    Uses dedicated format for single-item list."""

    prefixed = directories
    if prefix:
        prefixed = [(prefix + d) for d in directories]

    return str(prefixed[0]) if len(prefixed) == 1 else str(prefixed)


def FormatDescriptionOrComment(txt, desc):
    """Replaces $description with |desc| in |txt|."""
    # TODO(389069356): Remove support for $directory entirely once it's been
    # deprecated for a while.
    replaced_txt = txt.replace('$directory', desc)
    if txt != replaced_txt:
        EmitWarning('Usage of $directory is deprecated and will be removed '
                    'in a future update. Please use $description instead, '
                    'which has the same behavior by default.\n\n')
    replaced_txt = replaced_txt.replace('$description', desc)
    return replaced_txt


def AddUploadedByGitClSplitToDescription(description, is_experimental=False):
    """Adds a 'This CL was uploaded by git cl split.' line to |description|.

    The line is added before footers, or at the end of |description| if it has
    no footers.
    """
    if is_experimental:
        new_lines = [
            'This CL was uploaded by an experimental version of git cl split',
            '(https://crbug.com/389069356).'
        ]
    else:
        new_lines = ['This CL was uploaded by git cl split.']
    split_footers = git_footers.split_footers(description)
    lines = split_footers[0]
    if lines[-1] and not lines[-1].isspace():
        lines = lines + ['']
    lines = lines + new_lines
    if split_footers[1]:
        lines += [''] + split_footers[1]
    return '\n'.join(lines)


def UploadCl(refactor_branch, refactor_branch_upstream, cl_description, files,
             user_description, saved_splitting_file, comment, reviewers,
             changelist, cmd_upload, cq_dry_run, enable_auto_submit, topic,
             repository_root):
    """Uploads a CL with all changes to |files| in |refactor_branch|.

    Args:
        refactor_branch: Name of the branch that contains the changes to upload.
        refactor_branch_upstream: Name of the upstream of |refactor_branch|.
        cl_description: Description of this specific CL, e.g. the list of
          affected directories.
        files: List of AffectedFile instances to include in the uploaded CL.
        user_description: Description provided by user.
        comment: Comment to post on the uploaded CL.
        reviewers: A set of reviewers for the CL.
        changelist: The Changelist class.
        cmd_upload: The function associated with the git cl upload command.
        cq_dry_run: If CL uploads should also do a cq dry run.
        enable_auto_submit: If CL uploads should also enable auto submit.
        topic: Topic to associate with uploaded CLs.
    """
    # Create a branch.
    if not CreateBranchForOneCL(refactor_branch, files,
                                refactor_branch_upstream):
        Emit(
            f'Skipping existing branch for CL with description: {cl_description}'
        )
        return

    # Checkout all changes to files in |files|.
    deleted_files = []
    modified_files = []
    for action, f in files:
        abspath = os.path.abspath(os.path.join(repository_root, f))
        if action == 'D':
            deleted_files.append(abspath)
        else:
            modified_files.append(abspath)

    if deleted_files:
        git.run(*['rm'] + deleted_files)
    if modified_files:
        git.run(*['checkout', refactor_branch, '--'] + modified_files)

    # Commit changes. The temporary file is created with delete=False so that it
    # can be deleted manually after git has read it rather than automatically
    # when it is closed.
    with gclient_utils.temporary_file() as tmp_file:
        gclient_utils.FileWrite(
            tmp_file,
            FormatDescriptionOrComment(user_description, cl_description))
        git.run('commit', '-F', tmp_file)

    # Upload a CL.
    upload_args = ['-f']
    if reviewers:
        upload_args.extend(['-r', ','.join(sorted(reviewers))])
    if cq_dry_run:
        upload_args.append('--cq-dry-run')
    if not comment:
        upload_args.append('--send-mail')
    if enable_auto_submit:
        upload_args.append('--enable-auto-submit')
    if topic:
        upload_args.append('--topic={}'.format(topic))
    Emit(f'Uploading CL with description: {cl_description} ...')

    ret = cmd_upload(upload_args)
    if ret != 0:
        Emit('Uploading failed.')
        Emit('Note: git cl split has built-in resume capabilities.')
        Emit(f'Delete {git.current_branch()} then run\n'
             f'git cl split --from-file={saved_splitting_file}\n'
             'to resume uploading.')

    if comment:
        changelist().AddComment(FormatDescriptionOrComment(
            comment, cl_description),
                                publish=True)


def GetFilesSplitByOwners(files, max_depth, repository_root):
    """Returns a map of files split by OWNERS file.

    Args:
        files: List of the file paths to be grouped by the OWNERS.
            Note that each path is relative to the repostiory root.
        max_depth: Max depth to traverse from the repository path.
        repository_root: Absolute path to the repository root.

    Returns:
        A map where keys are paths to directories containing an OWNERS file and
        values are lists of files sharing an OWNERS file.
    """
    files_split_by_owners = {}
    for action, path in files:
        # normpath() is important to normalize separators here, in prepration
        # for str.split() before. It would be nicer to use something like
        # pathlib here but alas...
        dir_with_owners = os.path.normpath(os.path.dirname(path))
        if max_depth >= 1:
            dir_with_owners = os.path.join(
                *dir_with_owners.split(os.path.sep)[:max_depth])

        # Find the closest parent directory with an OWNERS file.
        dir_with_owners = os.path.join(repository_root, dir_with_owners)
        while dir_with_owners != repository_root:
            if dir_with_owners in files_split_by_owners:
                break
            owners_path = os.path.join(dir_with_owners, 'OWNERS')
            if os.path.isfile(owners_path):
                break
            if os.path.lexists(owners_path):
                raise ClSplitParseError(
                    f'{owners_path} exists, but is not a file')

            dir_with_owners = os.path.dirname(dir_with_owners)

        files_split_by_owners.setdefault(
            os.path.relpath(dir_with_owners, start=repository_root), []).append(
                (action, path))
    return files_split_by_owners


def PrintClInfo(cl_index, num_cls, cl_description, file_paths, user_description,
                reviewers, cq_dry_run, enable_auto_submit, topic):
    """Prints info about a CL.

    Args:
        cl_index: The index of this CL in the list of CLs to upload.
        num_cls: The total number of CLs that will be uploaded.
        cl_description: Description of this specific CL, e.g. the list of
          affected directories.
        file_paths: A list of files in this CL.
        user_description: Description provided by user.
        reviewers: A set of reviewers for this CL.
        cq_dry_run: If the CL should also be sent to CQ dry run.
        enable_auto_submit: If the CL should also have auto submit enabled.
        topic: Topic to set for this CL.
    """
    description_lines = FormatDescriptionOrComment(user_description,
                                                   cl_description).splitlines()
    indented_description = '\n'.join(['    ' + l for l in description_lines])

    Emit('CL {}/{}'.format(cl_index, num_cls))
    Emit('Paths: {}'.format(cl_description))
    Emit('Reviewers: {}'.format(', '.join(reviewers)))
    Emit('Auto-Submit: {}'.format(enable_auto_submit))
    Emit('CQ Dry Run: {}'.format(cq_dry_run))
    Emit('Topic: {}'.format(topic))
    Emit('\n' + indented_description + '\n')
    Emit('\n'.join(file_paths))


def LoadDescription(description_file, dry_run):
    if not description_file:
        if not dry_run:
            # Parser checks this as well, so should be impossible
            raise ValueError(
                "Must provide a description file except during dry runs")
        return ('Dummy description for dry run.\n'
                'description = $description')

    return gclient_utils.FileRead(description_file)


def ProcessDescription(description_file: str, dry_run: bool,
                       target_range: bool) -> str:
    """
    Load the provided description, append the note about git cl split, and
    (on a real run), validate that it contains a bug link.

    Returns the loaded description, or None if the user aborted due to a
    missing bug link.
    """
    description = LoadDescription(description_file, dry_run)

    description = AddUploadedByGitClSplitToDescription(
        description, is_experimental=target_range)

    if not dry_run and not CheckDescriptionBugLink(description):
        return None

    return description

def PrintSummary(cl_infos, refactor_branch):
    """Print a brief summary of the splitting so the user
       can review it before uploading.

    Args:
       files_split_by_reviewers: A dictionary mapping reviewer tuples
           to the files and directories assigned to them.
    """
    for info in cl_infos:
        Emit(f'Reviewers: {info.reviewers}, files: {len(info.files)}, '
             f'description: {info.description}')

    num_cls = len(cl_infos)
    Emit(f'\nWill split branch {refactor_branch} into {num_cls} CLs. '
         'Please quickly review them before proceeding.\n')

    if (num_cls > CL_SPLIT_FORCE_LIMIT):
        EmitWarning(
            'Uploading this many CLs may potentially '
            'reach the limit of concurrent runs, imposed on you by the '
            'build infrastructure. Your runs may be throttled as a '
            'result.\n\nPlease email infra-dev@chromium.org if you '
            'have any questions. '
            'The infra team reserves the right to cancel '
            'your jobs if they are overloading the CQ.\n\n'
            '(Alternatively, you can reduce the number of CLs created by '
            'using the --max-depth option, or altering the arguments to '
            '--target-range, as appropriate. Pass --dry-run to examine the '
            'CLs which will be created until you are happy with the '
            'results.)')


def SummarizeAndValidate(dry_run: bool, summarize: bool,
                         files: List[Tuple[str, str]], refactor_branch: str,
                         cl_infos: List[CLInfo]) -> Tuple[List[CLInfo], str]:
    """
    Print a summary of the generated splitting for the user. If we're doing a
    real run, prompt the user to confirm the splitting is acceptable, and
    allow them to edit it if they wish.

    If we're doing a real run, also save the splitting to a file so the user
    can safely resume an aborted upload with the same splitting.

    Arguments:
    dry_run: Whether or not we're doing a dry run
    summarize: If we're doing a dry run, should we print a concise summary first
    files: The list of (action, file) pairs that make up the CL we're splitting
    refactor_branch: Name of the branch we're splitting

    Returns:
    A pair of the edited cl_infos and the name of the file to which we saved
    the splitting. If the user aborts, the edited cl_infos will be falsy.
    """
    if not dry_run or summarize:
        PrintSummary(cl_infos, refactor_branch)

    if dry_run:
        return cl_infos, ""

    answer = gclient_utils.AskForData(
        'Proceed? (y/N, or i to edit interactively): ')

    if answer.lower() == 'i':
        cl_infos, saved_splitting_file = EditSplittingInteractively(
            cl_infos, files_on_disk=files)
    else:
        # Save so the user can use the splitting later if they want to
        saved_splitting_file = SaveSplittingToTempFile(cl_infos)
        if answer.lower() != 'y':
            return None, saved_splitting_file

    # Make sure there isn't any clutter left over from a previous run
    if not ValidateExistingBranches(refactor_branch, cl_infos):
        return None, saved_splitting_file

    return cl_infos, saved_splitting_file


def ComputeSplitting(
    from_file: str,
    files: List[Tuple[str, str]],
    target_range: Tuple[int, int],
    max_depth: int,
    reviewers_override: List[str],
    expect_owners_override: bool,
    cl,
    repository_root: str,
) -> List[CLInfo]:
    """
    Split the current CL into sub-CLs by partitioning the files and assigning
    reviewers. The method used depends on the command-line arguments.

    Arguments are the same as SplitCl, excecpt for the following:
    cl: Changelist class instance, for calling owners methods
    """
    author = git.run('config', 'user.email').strip() or None

    if from_file:
        # Load a precomputed splitting
        cl_infos = LoadSplittingFromFile(from_file, files_on_disk=files)
    elif target_range:
        # Use the directory-based clustering algorithm
        min_files, max_files = target_range
        cl_infos = GroupFilesByDirectory(cl, author, expect_owners_override,
                                         files, min_files, max_files)
    else:
        # Use the default algorithm
        files_split_by_reviewers = SelectReviewersForFiles(
            cl, author, files, max_depth, repository_root)

        cl_infos = CLInfoFromFilesAndOwnersDirectoriesDict(
            files_split_by_reviewers)

    # Note that we do this override even if the list is empty (indicating that
    # the user requested CLs not be assigned to any reviewers).
    if reviewers_override != None:
        for info in cl_infos:
            info.reviewers = set(reviewers_override)

    return cl_infos


def SplitCl(description_file, comment_file, changelist, cmd_upload, dry_run,
            summarize, reviewers_override, cq_dry_run, enable_auto_submit,
            max_depth, topic, target_range, expect_owners_override, from_file,
            repository_root):
    """"Splits a branch into smaller branches and uploads CLs.

    Args:
        description_file: File containing the description of uploaded CLs.
        comment_file: File containing the comment of uploaded CLs.
        changelist: The Changelist class.
        cmd_upload: The function associated with the git cl upload command.
        dry_run: Whether this is a dry run (no branches or CLs created).
        reviewers_override: Either None or a (possibly empty) list of reviewers
            all CLs should be sent to.
        cq_dry_run: If CL uploads should also do a cq dry run.
        enable_auto_submit: If CL uploads should also enable auto submit.
        max_depth: The maximum directory depth to search for OWNERS files. A
            value less than 1 means no limit.
        topic: Topic to associate with split CLs.
        repository_root: Absolute path of the repository root.

    Returns:
        0 in case of success. 1 in case of error.
    """

    EnsureInGitRepository()
    cl = changelist()
    # Get the list of changed files, as well as the branch we're on and its
    # upstream.
    files, refactor_branch, refactor_branch_upstream = GetGitInfo(
        repository_root, cl)

    if not files:
        Emit('Cannot split an empty CL.')
        return 1

    # Load and validate the description and comment files now, so we can error
    # early if there's a problem with them.
    comment = gclient_utils.FileRead(comment_file) if comment_file else None
    description = ProcessDescription(description_file, dry_run, target_range)
    if not description:
        return 0

    cl_infos = ComputeSplitting(from_file, files, target_range, max_depth,
                                reviewers_override, expect_owners_override, cl,
                                repository_root)

    cl_infos, saved_splitting_file = SummarizeAndValidate(
        dry_run, summarize, files, refactor_branch, cl_infos)
    # If the user aborted, we're done
    if not cl_infos:
        return 0

    cls_per_reviewer = collections.defaultdict(int)
    for cl_index, cl_info in enumerate(cl_infos, 1):
        if dry_run and summarize:
            pass
        elif dry_run:
            file_paths = [f for _, f in cl_info.files]
            PrintClInfo(cl_index, len(cl_infos), cl_info.description,
                        file_paths, description, cl_info.reviewers, cq_dry_run,
                        enable_auto_submit, topic)
        else:
            UploadCl(refactor_branch, refactor_branch_upstream,
                     cl_info.description, cl_info.files, description,
                     saved_splitting_file, comment, cl_info.reviewers,
                     changelist, cmd_upload, cq_dry_run, enable_auto_submit,
                     topic, repository_root)

        for reviewer in cl_info.reviewers:
            cls_per_reviewer[reviewer] += 1

    # List the top reviewers that will be sent the most CLs as a result of
    # the split.
    reviewer_rankings = sorted(cls_per_reviewer.items(),
                               key=lambda item: item[1],
                               reverse=True)
    Emit('The top reviewers are:')
    for reviewer, count in reviewer_rankings[:CL_SPLIT_TOP_REVIEWERS]:
        Emit(f'    {reviewer}: {count} CLs')

    if dry_run:
        # Wait until now to save the splitting so the file name doesn't get
        # washed away by the flood of dry-run printing.
        SaveSplittingToTempFile(cl_infos)

    # Go back to the original branch.
    git.run('checkout', refactor_branch)
    return 0


def CheckDescriptionBugLink(description):
    """Verifies that the description contains a bug link.

    Examples:
        Bug: 123
        Bug: chromium:456

    Prompts user if the description does not contain a bug link.
    """
    bug_pattern = re.compile(r"^Bug:\s*(?:[a-zA-Z]+:)?[0-9]+", re.MULTILINE)
    matches = re.findall(bug_pattern, description)
    answer = 'y'
    if not matches:
        answer = gclient_utils.AskForData(
            'Description does not include a bug link. Proceed? (y/N):')
    return answer.lower() == 'y'


def SelectReviewersForFiles(cl, author, files, max_depth, repository_root):
    """Selects reviewers for passed-in files

    Args:
        cl: Changelist class instance
        author: Email of person running 'git cl split'
        files: List of files
        max_depth: The maximum directory depth to search for OWNERS files.
            A value less than 1 means no limit.
        repository_root: Absolute path of the repository root
    """
    info_split_by_owners = GetFilesSplitByOwners(files, max_depth,
                                                 repository_root)

    info_split_by_reviewers = {}

    for (directory, split_files) in info_split_by_owners.items():
        # Use '/' as a path separator in the branch name and the CL description
        # and comment.
        directory = directory.replace(os.path.sep, '/')
        file_paths = [f for _, f in split_files]
        # Convert reviewers list to tuple in order to use reviewers as key to
        # dictionary.
        reviewers = tuple(
            cl.owners_client.SuggestOwners(
                file_paths, exclude=[author, cl.owners_client.EVERYONE]))

        if not reviewers in info_split_by_reviewers:
            info_split_by_reviewers[reviewers] = FilesAndOwnersDirectory([], [])
        info_split_by_reviewers[reviewers].files.extend(split_files)
        info_split_by_reviewers[reviewers].owners_directories.append(directory)

    return info_split_by_reviewers


################################################################################
# Code for saving, editing, and loading splittings.
################################################################################

def SaveSplittingToFile(cl_infos: List[CLInfo], filename: str, silent=False):
    """
    Writes the listed CLs to the designated file, in a human-readable and
    editable format. Include an explanation of the file format at the top,
    as well as instructions for how to use it.
    """
    preamble = (
        "# CLs in this file must have the following format:\n"
        "# A 'Reviewers: [...]' line, where '...' is a (possibly empty) list "
        "of reviewer emails.\n"
        "# A 'Description: ...' line, where '...' is any string (by default, "
        "the list of directories the files have been pulled from).\n"
        "# One or more file lines, consisting of an <action>, <file> pair, in "
        "the format output by `git status`.\n\n"
        "# Each 'Reviewers' line begins a new CL.\n"
        "# To use the splitting in this file, use the --from-file option.\n\n")

    cl_string = "\n\n".join([info.FormatForPrinting() for info in cl_infos])
    gclient_utils.FileWrite(filename, preamble + cl_string)
    if not silent:
        Emit(f"Saved splitting to {filename}")


def SaveSplittingToTempFile(cl_infos: List[CLInfo], silent=False):
    """
    Create a file in the user's temp directory, and save the splitting there.
    """
    # We can't use gclient_utils.temporary_file because it will be removed
    temp_file, temp_name = tempfile.mkstemp(prefix="split_cl_")
    os.close(temp_file)  # Necessary for windows
    SaveSplittingToFile(cl_infos, temp_name, silent)
    return temp_name


class ClSplitParseError(Exception):
    pass


# Matches 'Reviewers: [...]', extracts the ...
reviewers_re = re.compile(r'Reviewers:\s*\[([^\]]*)\]')
# Matches 'Description: ...', extracts the ...
description_re = re.compile(r'Description:\s*(.+)')
# Matches '<action>, <file>', and extracts both
# <action> must be a valid code (either 1 or 2 letters)
file_re = re.compile(r'([MTADRC]{1,2}),\s*(.+)')

# We use regex parsing instead of e.g. json because it lets us use a much more
# human-readable format, similar to the summary printed in dry runs
def ParseSplittings(lines: List[str]) -> List[CLInfo]:
    """
    Parse a splitting file. We expect to get a series of lines in the format
    of CLInfo.FormatForPrinting. In the following order, we expect to see
    - A 'Reviewers: ' line containing a list,
    - A 'Description: ' line containing anything, and
    - A list of <action>, <path> pairs, each on its own line

    Note that this function only transforms the file into a list of CLInfo
    (if possible). It does not validate the information; for that, see
    ValidateSplitting.
    """

    cl_infos = []
    current_cl_info = None
    for line in lines:
        line = line.strip()

        # Skip empty or commented lines
        if not line or line.startswith('#'):
            continue

        # Start a new CL whenever we see a new Reviewers: line
        m = re.fullmatch(reviewers_re, line)
        if m:
            reviewers_str = m.group(1)
            reviewers = [r.strip() for r in reviewers_str.split(",")]
            # Account for empty list or trailing comma
            if not reviewers[-1]:
                reviewers = reviewers[:-1]

            if current_cl_info:
                cl_infos.append(current_cl_info)

            current_cl_info = CLInfo(reviewers=reviewers)
            continue

        if not current_cl_info:
            # Make sure no nonempty lines appear before the first CL
            raise ClSplitParseError(
                f"Error: Line appears before the first 'Reviewers: ' line:\n{line}"
            )

        # Description is just used as a description, so any string is fine
        m = re.fullmatch(description_re, line)
        if m:
            if current_cl_info.description:
                raise ClSplitParseError(
                    f"Error parsing line: CL already has a description entry\n{line}"
                )
            current_cl_info.description = m.group(1).strip()
            continue

        # Any other line is presumed to be an '<action>, <file>' pair
        m = re.fullmatch(file_re, line)
        if m:
            action, path = m.groups()
            current_cl_info.files.append((action, path))
            continue

        raise ClSplitParseError("Error parsing line: Does not look like\n"
                                "'Reviewers: [...]',\n"
                                "'Description: ...', or\n"
                                f"a pair of '<action>, <file>':\n{line}")

    if (current_cl_info):
        cl_infos.append(current_cl_info)

    return cl_infos


def ValidateSplitting(cl_infos: List[CLInfo], filename: str,
                      files_on_disk: List[Tuple[str, str]]):
    """
    Ensure that the provided list of CLs is a valid splitting.

    Specifically, check that:
    - Each file is in at most one CL
    - Each file and action appear in the list of changed files reported by git
    - Warn if some files don't appear in any CL
    - Warn if a reviewer string looks wrong, or if a CL is empty
    """
    # Validate the parsed information
    if not cl_infos:
        EmitWarning("No CLs listed in file. No action will be taken.")
        return []

    files_in_loaded_cls = set()
    # Collect all files, ensuring no duplicates
    # Warn on empty CLs or invalid reviewer strings
    for info in cl_infos:
        if not info.files:
            EmitWarning("CL has no files, and will be skipped:\n",
                        info.FormatForPrinting())
        for file_info in info.files:
            if file_info in files_in_loaded_cls:
                raise ClSplitParseError(
                    f"File appears in multiple CLs in {filename}:\n{file_info}")

            files_in_loaded_cls.add(file_info)
        for reviewer in info.reviewers:
            if not (re.fullmatch(r"[^@]+@[^.]+\..+", reviewer)):
                EmitWarning("reviewer does not look like an email address: ",
                            reviewer)

    # Strip empty CLs
    cl_infos = [info for info in cl_infos if info.files]

    # Ensure the files in the user-provided CL splitting match the files
    # that git reports.
    # Warn if not all the files git reports appear.
    # Fail if the user mentions a file that isn't reported by git
    files_on_disk = set(files_on_disk)
    if not files_in_loaded_cls.issubset(files_on_disk):
        extra_files = files_in_loaded_cls.difference(files_on_disk)
        extra_files_str = "\n".join(f"{action}, {file}"
                                    for (action, file) in extra_files)
        raise ClSplitParseError(
            f"Some files are listed in {filename} but do not match any files "
            f"listed by git:\n{extra_files_str}")

    unmentioned_files = files_on_disk.difference(files_in_loaded_cls)
    if (unmentioned_files):
        EmitWarning(
            "the following files are not included in any CL in {filename}. "
            "They will not be uploaded:")
        for file in unmentioned_files:
            Emit(file)


def LoadSplittingFromFile(filename: str,
                          files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]:
    """
    Given a file and the list of <action>, <file> pairs reported by git,
    read the file and return the list of CLInfos it contains.
    """
    lines = gclient_utils.FileRead(filename).splitlines()

    cl_infos = ParseSplittings(lines)
    ValidateSplitting(cl_infos, filename, files_on_disk)

    return cl_infos


def EditSplittingInteractively(
        cl_infos: List[CLInfo],
        files_on_disk: List[Tuple[str, str]]) -> Tuple[List[CLInfo], str]:
    """
    Allow the user to edit the generated splitting using their default editor.
    Make sure the edited splitting is saved so they can retrieve it if needed.
    """

    tmp_file = SaveSplittingToTempFile(cl_infos, silent=True)
    splitting = gclient_utils.RunEditor(gclient_utils.FileRead(tmp_file), False)
    cl_infos = ParseSplittings(splitting.splitlines())

    # Save the edited splitting before validation, so the user can go back
    # and edit it if there are any typos
    SaveSplittingToFile(cl_infos, tmp_file)
    ValidateSplitting(cl_infos, "the provided splitting", files_on_disk)
    return cl_infos, tmp_file


################################################################################
# Code for the clustering-based splitting algorithm.
################################################################################


def GroupFilesByDirectory(cl, author: str, expect_owners_override: bool,
                          all_files: Tuple[str, str], min_files: int,
                          max_files: int) -> List[CLInfo]:
    """
    Group the contents of |all_files| into clusters of size between |min_files|
    and |max_files|, inclusive, based on their directory structure. Assign one
    reviewer to each group to create a CL. If |expect_owners_override| is true,
    consider only the directory structure of the files, ignoring ownership.

    May rarely create groups with fewer than |min_files| files, or assign
    multiple reviewers to a single CL.

    Args:
        cl: Changelist class instance, for calling owners methods
        author: Email of person running the script; never assigned as a reviewer
    """

    # Record the actions associated with each file because the clustering
    # algorithm just takes filenames
    actions_by_file = {}
    file_paths = []
    for (action, file) in all_files:
        actions_by_file[file] = action
        file_paths.append(file)

    reviewers_so_far = []
    cls = []
    # Go through the clusters by path length so that we're likely to choose
    # top-level owners earlier
    for (directories, files) in sorted(
            ClusterFiles(expect_owners_override, file_paths, min_files,
                         max_files)):
        # Use '/' as a path separator in the branch name and the CL description
        # and comment.
        directories = [
            directory.replace(os.path.sep, '/') for directory in directories
        ]
        files_with_actions = [(actions_by_file[file], file) for file in files]

        # Try to find a reviewer. If some of the files have noparent set,
        # we'll likely get multiple reviewers. Don't consider reviewers we've
        # already assigned something to.
        # FIXME: Rather than excluding existing reviewers, it would be better
        # to just penalize them, but still choose them over reviewers who have
        # a worse score. At the moment, owners_client doesn't support anything
        # to do with the score.
        reviewers = cl.owners_client.SuggestMinimalOwners(
            files,
            exclude=[author, cl.owners_client.EVERYONE] + reviewers_so_far)

        # Retry without excluding existing reviewers if we couldn't find any.
        # This is very unlikely since there are many fallback owners.
        if not reviewers:
            reviewers = cl.owners_client.SuggestMinimalOwners(
                directories, exclude=[author, cl.owners_client.EVERYONE])

        reviewers_so_far.extend(reviewers)
        cls.append(
            CLInfo(set(reviewers), files_with_actions,
                   FormatDirectoriesForPrinting(directories)))

    return cls


### Trie Code


def FolderHasParent(path: str) -> bool:
    """
    Check if a folder inherits owners from a higher-level directory:
    i.e. it's not at top level, and doesn't have an OWNERS file that contains
    `set noparent`
    """
    # Treat each top-leve directory as having no parent, as well as the root
    # directory.
    if len(path.split(os.path.sep)) <= 1:
        # Top level
        return False

    owners_file = os.path.join(path, 'OWNERS')
    if (os.path.isfile(owners_file)):
        with (open(owners_file)) as f:
            for line in f.readlines():

                # Strip whitespace and comments
                line = line.split('#')[0].strip()

                if (line == 'set noparent'):
                    return False

    return True


class DirectoryTrie():
    """
    Trie structure: Nested dictionaries representing file paths.
    Each level represents one folder, and contains:
    - The path to that folder (its prefix)
    - A list of files that reside in that folder
    - A boolean for whether that folder inherits owners from a parent folder
    - One Trie representing each of that folder's subdirectories

    Files are stored with their entire path, so we don't need to reconstruct
    it every time we read them.
    """

    def __init__(self, expect_owners_override, prefix: str = ""):
        """ Create an empty DirectoryTrie with the specified prefix """
        has_parent = expect_owners_override or FolderHasParent(prefix)
        # yapf: disable
        self.subdirectories : Dict[str, DirectoryTrie] = {}
        self.files          : List[str]                = []
        self.prefix         : str                      = prefix
        self.has_parent     : bool                     = has_parent
        self.expect_owners_override : bool             = expect_owners_override
        # yapf: enable

    def AddFile(self, path: List[str]):
        """
        Add a file to the Trie, adding new subdirectories if necessary.
        The file should be represented as a list of directories, with the final
        entry being the filename.
        """
        if len(path) == 1:
            self.files.append(os.path.join(self.prefix, path[0]))
        else:
            directory = path[0]
            if directory not in self.subdirectories:
                prefix = os.path.join(self.prefix, directory)
                self.subdirectories[directory] = DirectoryTrie(
                    self.expect_owners_override, prefix)
            self.subdirectories[directory].AddFile(path[1:])

    def AddFiles(self, paths: List[List[str]]):
        """ Convenience function to add many files at once. """
        for path in paths:
            self.AddFile(path)

    def ToList(self) -> List[str]:
        """ Return a list of all files in the trie. """
        files = []
        files += self.files
        for subdir in self.subdirectories.values():
            files += subdir.ToList()
        return files


### Clustering code

# Convenience type: a "bin" represents a collection of files:
# it tracks their prefix(es) and the list of files themselves.
# Both elements are string lists.
Bin = collections.namedtuple("Bin", "prefixes files")


def PackFiles(max_size: int, files_to_pack: List[Bin]) -> List[Bin]:
    """
    Simple bin packing algorithm: given a list of small bins, consolidate them
    into as few larger bins as possible, where each bin can hold at most
    |max_size| files.
    """
    bins = []
    # Guess how many bins we'll need ahead of time so we can spread things
    # between them. We'll add more bins later if necessary
    expected_bins_needed = math.ceil(
        sum(len(bin.files) for bin in files_to_pack) / max_size)
    expected_avg_bin_size = math.ceil(
        sum(len(bin.files) for bin in files_to_pack) / expected_bins_needed)
    for _ in range(expected_bins_needed):
        bins.append(Bin([], []))

    # Sort by number of files, decreasing
    sorted_by_num_files = sorted(files_to_pack, key=lambda bin: -len(bin.files))

    # Invariant: the least-filled bin is always the first element of |bins|
    # This ensures we spread things between bins as much as possible.
    for (prefixes, files) in sorted_by_num_files:
        b = bins[0]
        if len(b.files) + len(files) <= max_size:
            b[0].extend(prefixes)
            b[1].extend(files)
        else:
            # Since the first bin is the emptiest, if we failed to fit in
            # that we don't need to try any others.

            # If these files alone are too large, split them up into
            # groups of size |expected_avg_bin_size|
            if len(files) > max_size:
                bins.extend([
                    Bin(prefixes, files[i:i + expected_avg_bin_size])
                    for i in range(0, len(files), expected_avg_bin_size)
                ])
            else:
                bins.append(Bin(prefixes, files))

        # Maintain invariant
        bins.sort(key=lambda bin: len(bin.files))
    return [bin for bin in bins if len(bin.files) > 0]


def ClusterFiles(expect_owners_override: bool, files: List[str], min_files: int,
                 max_files: int) -> List[Bin]:
    """
    Group the entries of |files| into clusters of size between |min_files| and
    |max_files|, inclusive. Guarantees that the size does not exceed
    |max_files|, but the size may rarely be less than |min_files|. If
    |expect_owners_override| is true, don't consider ownership when clustering,
    only directory structure.

    Clustering strategy for a given directory:
    1. Try to group each subdirectory independently
    2. Group any remaining files as follows:
        2a. If there are less than |min_files| files and the folder has a parent,
            give up and let the parent folder handle it.
        2c. Otherwise, if there are at most |max_files| files, create one
            cluster.
        2c. Finally, if there are more than |max_files| files, create several
            clusters of size less than |max_files|.
    """
    trie = DirectoryTrie(expect_owners_override)
    trie.AddFiles([file.split(os.path.sep) for file in files])
    clusters: List[Bin] = []

    def ClusterDirectory(current_dir: DirectoryTrie) -> List[str]:
        """
        Attempt to cluster the files for a directory, by grouping them into
        Bins and appending the bins to |clusters|.
        Returns a list of files that weren't able to be clustered (because
        there weren't at least |min_files| files).
        """
        # Track all the files we need to handle in this directory
        unclustered_files: List[Bin] = []

        # Record any files that live in this directory directly
        if len(current_dir.files) > 0:
            unclustered_files.append(
                Bin([current_dir.prefix], current_dir.files))

        # Step 1: Try to cluster each subdirectory independently
        for subdir in current_dir.subdirectories.values():
            unclustered_files_in_subdir = ClusterDirectory(subdir)
            # If not all files were submitted, record them
            if len(unclustered_files_in_subdir) > 0:
                unclustered_files.append(
                    Bin([subdir.prefix], unclustered_files_in_subdir))

        # A flattened list containing just the names of all unclustered files
        unclustered_files_names_only = [
            file for bin in unclustered_files for file in bin.files
        ]

        if len(unclustered_files_names_only) == 0:
            return []

        # Step 2a: If we don't have enough files for a cluster and it's possible
        # to recurse upward, do so
        if (len(unclustered_files_names_only) < min_files
                and current_dir.has_parent):
            return unclustered_files_names_only

        # Step 2b, 2c: Create one or more clusters from the unclustered files
        # by appending to the |clusters| variable in the outer scope
        nonlocal clusters
        if len(unclustered_files_names_only) <= max_files:
            clusters.append(
                Bin([current_dir.prefix], unclustered_files_names_only))
        else:
            clusters += PackFiles(max_files, unclustered_files)

        return []

    unclustered_paths = ClusterDirectory(trie)
    if (len(unclustered_paths) > 0):
        EmitWarning(
            'Not all files were assigned to a CL!\n'
            'This should be impossible, file a bug.\n'
            f'{len(unclustered_paths)} Unassigned files: {unclustered_paths}')

    return clusters
